package com.bw.gmall.realtime.dwd.db.split.func;

import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.bean.TableProcessDwd;
import com.bw.gmall.realtime.common.util.JdbcUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.util.HashMap;
import java.util.List;

/**
 * @projectName: gmalll2206
 * @package: com.bw.gmall.realtime.dwd.db.split.func
 * @className: DwdProcessFunction
 * @author: DuanYu
 * @description: TODO
 * @date: 2025/2/27 14:01
 * @version: 1.0
 */

public class DwdProcessFunction extends BroadcastProcessFunction<JSONObject, TableProcessDwd, Tuple2<JSONObject, TableProcessDwd>> {

    private final MapStateDescriptor<String, TableProcessDwd> mapDescriptor;
    private HashMap<String, TableProcessDwd> hashMap = new HashMap<>();

    public DwdProcessFunction(MapStateDescriptor<String, TableProcessDwd> mapDescriptor) {
        this.mapDescriptor = mapDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Connection mysqlConnection = JdbcUtil.getMysqlConnection();
        List<TableProcessDwd> tableProcessDwds = JdbcUtil.queryList(mysqlConnection, "select * from gmall2023_config.table_process_dwd", TableProcessDwd.class, true);
        for (TableProcessDwd tableProcessDwd : tableProcessDwds) {
            hashMap.put(tableProcessDwd.getSourceTable() + '-' + tableProcessDwd.getSourceType(), tableProcessDwd);
        }
    }

    @Override
    public void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, TableProcessDwd, Tuple2<JSONObject, TableProcessDwd>>.ReadOnlyContext readOnlyContext, Collector<Tuple2<JSONObject, TableProcessDwd>> collector) throws Exception {
        ReadOnlyBroadcastState<String, TableProcessDwd> broadcastState = readOnlyContext.getBroadcastState(mapDescriptor);
        String table = jsonObject.getString("table");// user-info       coupon
        String type = jsonObject.getString("type");// insert update delete
        // user_info-insert
        // user_info-update
        String key = table + "-" + type;
        TableProcessDwd tableProcessDwd = broadcastState.get(key);

        if (tableProcessDwd == null) {
            tableProcessDwd = hashMap.get(key);
        }
        if (tableProcessDwd != null) {
            collector.collect(Tuple2.of(jsonObject, tableProcessDwd));
        }
    }

    @Override
    public void processBroadcastElement(TableProcessDwd tableProcessDwd, BroadcastProcessFunction<JSONObject, TableProcessDwd, Tuple2<JSONObject, TableProcessDwd>>.Context context, Collector<Tuple2<JSONObject, TableProcessDwd>> collector) throws Exception {
        BroadcastState<String, TableProcessDwd> broadcastState = context.getBroadcastState(mapDescriptor);
        String op = tableProcessDwd.getOp();
        String sourceType = tableProcessDwd.getSourceType();

        String key = tableProcessDwd.getSourceTable() + "-" + sourceType;
        if ("d".equals(op)) {
            broadcastState.remove(key);
            hashMap.remove(key);
        } else {
            broadcastState.put(key, tableProcessDwd);
        }
    }
}